package io.openmessaging.demo;

import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.MessageHeader;

import java.io.UnsupportedEncodingException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Set;

/**
 * Author :  Rocky
 * Date : 20/05/2017 14:01
 * Description :
 * Test :
 */
public class MessageUtils2 {

    private static final byte INT_VALUE = 0;
    private static final byte LONG_VALUE = 1;
    private static final byte DOUBLE_VALUE = 2;
    private static final byte STRING_VALUE = 3;

    public static boolean writeMessage(BytesMessage msg, ByteBuffer mbb) {
        if (!mbb.hasRemaining()) {
            return false;
        }

        int oldPos = mbb.position();
        try {
            writeMsgHeader(msg.headers(), mbb);
            writeMsgProperties(msg.properties(), mbb);
            writeBody(msg.getBody(), mbb);

            //如果编码后一个消息大于了100byte，打印一下日志
            /*int msgSize = mbb.position() - oldPos;
            if (msgSize >= 100) {
                System.out.println("消息编码后大于100byte，实际大小为 " + msgSize + " byte");
            }*/

            mbb.putInt(0, mbb.position());
            return true;
        } catch (BufferOverflowException e) {
            mbb.position(oldPos);
            return false;
        } catch (UnsupportedEncodingException e) {
            mbb.position(oldPos);
            e.printStackTrace();
            return false;
        }
    }

    public static BytesMessage readMessage(ByteBuffer mbb) {
        try {
            BytesMessage result = new DefaultBytesMessage();

            int headerSize = unsignedByteToInt(mbb.get());
            if (headerSize > 0) {
                readMsgHeader(result.headers(), headerSize, mbb);
            }
            int propertySize = unsignedByteToInt(mbb.get());
            if (propertySize > 0) {
                readMsgProperties(result.properties(), propertySize, mbb);
            }
            int bodySize = unsignedShortToInt(mbb.getShort());
            if (bodySize > 0) {
                byte[] body = readBody(bodySize, mbb);
                result.setBody(body);
            }
            return result;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    private static void writeMsgHeader(KeyValue headers, ByteBuffer mbb) throws UnsupportedEncodingException {
        if (headers == null || headers.keySet().size() == 0) {
            mbb.put((byte) 0);  //当做无符号数处理,最大可以表示255
            return;
        }

        Set<String> headerKeys = headers.keySet();
        byte headerCount = (byte) headerKeys.size();
        mbb.put(headerCount);  //当做无符号数处理,最大可以表示255
        for (String headerKey : headerKeys) {
            //写key
            byte keyByte = encodeMsgHeaderKey(headerKey);
            mbb.put(keyByte);

            //写value
            Object value = getValue(headers, headerKey);
            if (value instanceof String) {
                mbb.put(STRING_VALUE);
                byte[] strValue = ((String) value).getBytes("UTF-8");
                /*if (strValue.length > 255) {
                    System.err.println("header中key[" + headerKey + "]的值[" + value + "]对应的byte数组大于了255");
//                    System.exit(-1);
                }*/
                mbb.putShort((short) strValue.length);  //当做无符号数处理
                mbb.put(strValue);
                continue;
            }
            if (value instanceof Integer) {
                mbb.put(INT_VALUE);
                mbb.putInt((Integer) value);
                continue;
            }
            if (value instanceof Long) {
                mbb.put(LONG_VALUE);
                mbb.putLong((Long) value);
                continue;
            }
            if (value instanceof Double) {
                mbb.put(DOUBLE_VALUE);
                mbb.putDouble((Double) value);
                continue;
            }
            throw new RuntimeException("不支持的数据类型 : " + value.getClass().getName());
        }
    }

    private static KeyValue readMsgHeader(KeyValue kvs, int size, ByteBuffer mbb) throws UnsupportedEncodingException {
        KeyValue result = kvs;
        for (int i = 0; i < size; i++) {
            //读key
            byte keyByte = mbb.get();
            String key = decodeMsgHeaderKey(keyByte);

            //读value
            byte dataType = mbb.get();
            if (dataType == STRING_VALUE) {
                int valueByteSize = unsignedShortToInt(mbb.getShort());
                byte[] valueByte = new byte[valueByteSize];
                mbb.get(valueByte);
                String value = new String(valueByte, "UTF-8");
                result.put(key, value);
                continue;
            }

            if (dataType == INT_VALUE) {
                int value = mbb.getInt();
                result.put(key, value);
                continue;
            }
            if (dataType == LONG_VALUE) {
                long value = mbb.getLong();
                result.put(key, value);
                continue;
            }
            if (dataType == DOUBLE_VALUE) {
                double value = mbb.getDouble();
                result.put(key, value);
                continue;
            }

            throw new RuntimeException("不支持的数据类型:" + dataType);
        }
        return result;
    }

    private static void writeMsgProperties(KeyValue properties, ByteBuffer mbb) throws UnsupportedEncodingException {
        if (properties == null || properties.keySet().size() == 0) {
            mbb.put((byte) 0); //当做无符号数处理,最大可以表示255
            return;
        }

        Set<String> headerKeys = properties.keySet();
        byte headerCount = (byte) headerKeys.size();
        mbb.put(headerCount);  //当做无符号数处理,最大可以表示255
        for (String headerKey : headerKeys) {
            //写key
            byte[] keyBytes = headerKey.getBytes("UTF-8");
            if (keyBytes.length > 255) {
                System.err.println("property 中的key[" + headerKey + "]对应的byte数组长度大于了255");
//                System.exit(-1);
            }
            mbb.put((byte) keyBytes.length);  //当做无符号数处理,最大可以表示255
            mbb.put(keyBytes);

            //写value
            Object value = getValue(properties, headerKey);
            if (value instanceof String) {
                mbb.put(STRING_VALUE);
                byte[] strValue = ((String) value).getBytes("UTF-8");
                if (strValue.length > 255) {
                    System.err.println("property 中key[" + headerKey + "]的值[" + value + "]对应的byte数组大于了255");
//                    System.exit(-1);
                }

                mbb.put((byte) strValue.length);//当做无符号数处理，最大可以表示255
                mbb.put(strValue);
                continue;
            }

            if (value instanceof Integer) {
                mbb.put(INT_VALUE);
                mbb.putInt((Integer) value);
                continue;
            }
            if (value instanceof Long) {
                mbb.put(LONG_VALUE);
                mbb.putLong((Long) value);
                continue;
            }
            if (value instanceof Double) {
                mbb.put(DOUBLE_VALUE);
                mbb.putDouble((Double) value);
                continue;
            }

            throw new RuntimeException("不支持的数据类型 : " + value.getClass().getName());
        }
    }

    private static KeyValue readMsgProperties(KeyValue kvs, int size, ByteBuffer mbb) throws UnsupportedEncodingException {
        KeyValue result = kvs;
        for (int i = 0; i < size; i++) {
            //读key
            int keyByteSize = unsignedByteToInt(mbb.get());
            byte[] keyByte = new byte[keyByteSize];
            mbb.get(keyByte);
            String key = new String(keyByte, "UTF-8");

            //读value
            byte dataType = mbb.get();
            if (dataType == STRING_VALUE) {
                int valueByteSize = unsignedByteToInt(mbb.get());
                byte[] valueByte = new byte[valueByteSize];
                mbb.get(valueByte);
                String value = new String(valueByte, "UTF-8");
                result.put(key, value);
                continue;
            }

            if (dataType == INT_VALUE) {
                int value = mbb.getInt();
                result.put(key, value);
                continue;
            }
            if (dataType == LONG_VALUE) {
                long value = mbb.getLong();
                result.put(key, value);
                continue;
            }
            if (dataType == DOUBLE_VALUE) {
                double value = mbb.getDouble();
                result.put(key, value);
                continue;
            }

            throw new RuntimeException("不支持的数据类型 " + dataType);

        }
        return result;
    }

    private static void writeBody(byte[] body, ByteBuffer mbb) {
        if (body == null) {
            mbb.putShort((short) 0);
            return;
        }
        mbb.putShort((short) body.length);  //无符号short，最大为65535
        mbb.put(body);
    }

    private static byte[] readBody(int bodySize, ByteBuffer mbb) {
        byte[] result = new byte[bodySize];
        mbb.get(result);
        return result;
    }

    private static Object getValue(KeyValue kvs, String key) {
        Object value;
        try {
            value = kvs.getString(key);
        } catch (Exception e) {
            try {
                value = kvs.getLong(key);
            } catch (Exception ex) {
                try {
                    value = kvs.getDouble(key);
                } catch (Exception exx) {
                    value = kvs.getInt(key);
                }
            }
        }
        return value;
    }

    private static byte encodeMsgHeaderKey(String headerKey) {
        switch (headerKey) {
            case MessageHeader.MESSAGE_ID:
                return 1;

            case MessageHeader.TOPIC:
                return 2;

            case MessageHeader.QUEUE:
                return 3;

            case MessageHeader.BORN_TIMESTAMP:
                return 4;

            case MessageHeader.BORN_HOST:
                return 5;

            case MessageHeader.STORE_TIMESTAMP:
                return 6;

            case MessageHeader.STORE_HOST:
                return 7;

            case MessageHeader.START_TIME:
                return 8;

            case MessageHeader.STOP_TIME:
                return 9;

            case MessageHeader.TIMEOUT:
                return 10;

            case MessageHeader.PRIORITY:
                return 11;

            case MessageHeader.RELIABILITY:
                return 12;

            case MessageHeader.SEARCH_KEY:
                return 13;

            case MessageHeader.SCHEDULE_EXPRESSION:
                return 14;

            case MessageHeader.SHARDING_KEY:
                return 15;

            case MessageHeader.SHARDING_PARTITION:
                return 16;

            case MessageHeader.TRACE_ID:
                return 17;

            default:
                System.out.println("非指定的key");
                return -1;
        }
    }

    private static String decodeMsgHeaderKey(byte value) {

        switch (value) {
            case 1:
                return MessageHeader.MESSAGE_ID;

            case 2:
                return MessageHeader.TOPIC;

            case 3:
                return MessageHeader.QUEUE;

            case 4:
                return MessageHeader.BORN_TIMESTAMP;

            case 5:
                return MessageHeader.BORN_HOST;

            case 6:
                return MessageHeader.STORE_TIMESTAMP;

            case 7:
                return MessageHeader.STORE_HOST;

            case 8:
                return MessageHeader.START_TIME;

            case 9:
                return MessageHeader.STOP_TIME;

            case 10:
                return MessageHeader.TIMEOUT;

            case 11:
                return MessageHeader.PRIORITY;

            case 12:
                return MessageHeader.RELIABILITY;

            case 13:
                return MessageHeader.SEARCH_KEY;

            case 14:
                return MessageHeader.SCHEDULE_EXPRESSION;

            case 15:
                return MessageHeader.SHARDING_KEY;

            case 16:
                return MessageHeader.SHARDING_PARTITION;

            case 17:
                return MessageHeader.TRACE_ID;

            default:
                return "非指定的key";
        }
    }

    private static int unsignedByteToInt(byte num) {
        return num < 0 ? (num & 0xff) : num;
    }

    private static int unsignedShortToInt(short num) {
        return num < 0 ? Short.toUnsignedInt(num) : num;
    }
}
